{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "import inspect\n",
    "import os\n",
    "import shutil\n",
    "from pathlib import Path\n",
    "from pprint import pprint\n",
    "from tempfile import TemporaryDirectory\n",
    "from typing import *\n",
    "\n",
    "import pytest\n",
    "from aiokafka import AIOKafkaProducer\n",
    "from fastcore.basics import patch\n",
    "from IPython.display import Markdown\n",
    "\n",
    "from fastkafka._helpers import get_collapsible_admonition, source2markdown\n",
    "from fastkafka.testing import mock_AIOKafkaProducer_send, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# | notest\n",
    "# | hide\n",
    "\n",
    "import nest_asyncio"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# | notest\n",
    "# | hide\n",
    "\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Redpanda to test FastKafka"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## What is FastKafka?\n",
    "\n",
    "[FastKafka](https://fastkafka.airt.ai/) is a powerful and easy-to-use Python library for building asynchronous services that interact with Kafka topics. Built on top of [Pydantic](https://docs.pydantic.dev/), [AIOKafka](https://github.com/aio-libs/aiokafka) and [AsyncAPI](https://www.asyncapi.com/), FastKafka simplifies the process of writing producers and consumers for Kafka topics, handling all the parsing, networking, task scheduling and data generation automatically. With FastKafka, you can quickly prototype and develop high-performance Kafka-based services with minimal code, making it an ideal choice for developers looking to streamline their workflow and accelerate their projects.\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## What is Redpanda?\n",
    "\n",
    "Redpanda is a drop-in replacement for Kafka. Most of the Kafka tools work out of the box with Redpanda.\n",
    "\n",
    "From [redpanda.com](https://redpanda.com/):\n",
    "\n",
    "> Redpanda is a Kafka®-compatible streaming data platform that is proven to be 10x faster and 6x lower in total costs. It is also JVM-free, ZooKeeper®-free, Jepsen-tested and source available.\n",
    "\n",
    "Some of the advantages of Redpanda over Kafka are\n",
    "\n",
    "1. A single binary with built-in everything, no ZooKeeper® or JVM needed.\n",
    "2. Costs upto 6X less than Kafka.\n",
    "3. Up to 10x lower average latencies and up to 6x faster Kafka transactions without compromising correctness.\n",
    "\n",
    "To learn more about Redpanda, please visit their [website](https://redpanda.com/) or checkout this [blog post](https://redpanda.com/blog/redpanda-vs-kafka-performance-benchmark) comparing Redpanda and Kafka's performance benchmarks."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example repo\n",
    "\n",
    "A sample FastKafka-based library that uses Redpanda for testing, based on this guide, can be found [here](https://github.com/airtai/sample_fastkafka_with_redpanda)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The process\n",
    "\n",
    "Here are the steps we’ll be walking through to build our example:\n",
    "\n",
    "1. Set up the prerequisites.\n",
    "2. Clone the example repo.\n",
    "3. Explain how to write an application using FastKafka.\n",
    "4. Explain how to write a test case to test FastKafka with Redpanda.\n",
    "5. Run the test case and produce/consume messages."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Prerequisites\n",
    "\n",
    "\n",
    "Before starting, make sure you have the following prerequisites set up:\n",
    "\n",
    "1. **Python 3.x**: A Python 3.x installation is required to run FastKafka. You can download the latest version of Python from the [official website](https://www.python.org/downloads/). You'll also need to have pip installed and updated, which is Python's package installer.\n",
    "2. **Docker Desktop**: Docker is used to run Redpanda, which is required for testing FastKafka. You can download and install Docker Desktop from the [official website](https://www.docker.com/products/docker-desktop/).\n",
    "3. **Git**: You'll need to have Git installed to clone the example repo. You can download Git from the [official website](https://git-scm.com/downloads)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Cloning and setting up the example repo\n",
    "\n",
    "To get started with the example code, clone the [GitHub repository](https://github.com/airtai/sample_fastkafka_with_redpanda) by running the following command in your terminal:\n",
    "\n",
    "```cmd\n",
    "git clone https://github.com/airtai/sample_fastkafka_with_redpanda.git\n",
    "cd sample_fastkafka_with_redpanda\n",
    "```\n",
    "\n",
    "This will create a new directory called sample_fastkafka_with_redpanda and download all the necessary files.\n",
    "\n",
    "\n",
    "### Create a virtual environment\n",
    "\n",
    "Before writing any code, let’s [create a new virtual environment](https://docs.python.org/3/library/venv.html#module-venv) for our project.\n",
    "\n",
    " A virtual environment is an isolated environment for a Python project, which allows you to manage project-specific dependencies and avoid conflicts between different projects.\n",
    "\n",
    "To create a new virtual environment, run the following commands in your terminal:\n",
    "\n",
    "```cmd\n",
    "python3 -m venv venv\n",
    "```\n",
    "\n",
    "This will create a new directory called `venv` in your project directory, which will contain the virtual environment.\n",
    "\n",
    "To activate the virtual environment, run the following command:\n",
    "\n",
    "```cmd\n",
    "source venv/bin/activate\n",
    "```\n",
    "\n",
    "This will change your shell's prompt to indicate that you are now working inside the virtual environment.\n",
    "\n",
    "Finally, run the following command to upgrade `pip`, the Python package installer:\n",
    "\n",
    "```cmd\n",
    "pip install --upgrade pip\n",
    "```\n",
    "\n",
    "\n",
    "### Install Python dependencies\n",
    "\n",
    "Next, let's install the required Python dependencies. In this guide, we'll be using `FastKafka` to write our application code and `pytest` and `pytest-asyncio` to test it.\n",
    "\n",
    "You can install the dependencies from the `requirements.txt` file provided in the cloned repository by running:\n",
    "\n",
    "```cmd\n",
    "pip install -r requirements.txt\n",
    "```\n",
    "\n",
    "This will install all the required packages and their dependencies."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Writing server code\n",
    "\n",
    "The `application.py` file in the cloned repository demonstrates how to use FastKafka to consume messages from a Kafka topic, make predictions using a predictive model, and publish the predictions to another Kafka topic. Here is an explanation of the code:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Preparing the demo model\n",
    "\n",
    "First we will prepare our model using the Iris dataset so that we can demonstrate the predictions using FastKafka. The following call downloads the dataset and trains the model.\n",
    "\n",
    "We will wrap the model creation into a lifespan of our app so that the model is created just before the app is started."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from contextlib import asynccontextmanager\n",
    "\n",
    "from sklearn.datasets import load_iris\n",
    "from sklearn.linear_model import LogisticRegression\n",
    "\n",
    "from fastkafka import FastKafka\n",
    "\n",
    "ml_models = {}\n",
    "\n",
    "\n",
    "@asynccontextmanager\n",
    "async def lifespan(app: FastKafka):\n",
    "    # Load the ML model\n",
    "    X, y = load_iris(return_X_y=True)\n",
    "    ml_models[\"iris_predictor\"] = LogisticRegression(random_state=0, max_iter=500).fit(\n",
    "        X, y\n",
    "    )\n",
    "    yield\n",
    "    # Clean up the ML models and release the resources\n",
    "    ml_models.clear()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Messages\n",
    "\n",
    "FastKafka uses [Pydantic](https://docs.pydantic.dev/) to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your Kafka-based applications. Pydantic's [`BaseModel`](https://docs.pydantic.dev/usage/models/) class allows you to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.\n",
    "\n",
    "This example defines two message classes for use in a FastKafka application:\n",
    "\n",
    "- The `IrisInputData` class is used to represent input data for a predictive model. It has four fields of type [`NonNegativeFloat`](https://docs.pydantic.dev/latest/api/types/#pydantic.types.NonNegativeFloat), which is a subclass of float that only allows non-negative floating point values.\n",
    "\n",
    "- The `IrisPrediction` class is used to represent the output of the predictive model. It has a single field `species` of type string representing the predicted species.\n",
    "\n",
    "These message classes will be used to parse and validate incoming data in Kafka consumers and producers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pydantic import BaseModel, Field, NonNegativeFloat\n",
    "\n",
    "\n",
    "class IrisInputData(BaseModel):\n",
    "    sepal_length: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Sepal length in cm\"\n",
    "    )\n",
    "    sepal_width: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Sepal width in cm\"\n",
    "    )\n",
    "    petal_length: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Petal length in cm\"\n",
    "    )\n",
    "    petal_width: NonNegativeFloat = Field(\n",
    "        ..., example=0.5, description=\"Petal width in cm\"\n",
    "    )\n",
    "\n",
    "\n",
    "class IrisPrediction(BaseModel):\n",
    "    species: str = Field(..., example=\"setosa\", description=\"Predicted species\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Application\n",
    "\n",
    "This example shows how to initialize a FastKafka application.\n",
    "\n",
    "It starts by defining a dictionary called `kafka_brokers`, which contains two entries: `\"localhost\"` and `\"production\"`, specifying local development and production Kafka brokers. Each entry specifies the URL, port, and other details of a Kafka broker. This dictionary is used both to generate documentation and to later run the server against one of the given kafka broker.\n",
    "\n",
    "Next, an instance of the `FastKafka` class is initialized with the minimum required arguments:\n",
    "\n",
    "- `kafka_brokers`: a dictionary used for generating documentation\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastkafka import FastKafka\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"localhost\": {\n",
    "        \"url\": \"localhost\",\n",
    "        \"description\": \"local development kafka broker\",\n",
    "        \"port\": 9092,\n",
    "    },\n",
    "    \"production\": {\n",
    "        \"url\": \"kafka.airt.ai\",\n",
    "        \"description\": \"production kafka broker\",\n",
    "        \"port\": 9092,\n",
    "        \"protocol\": \"kafka-secure\",\n",
    "        \"security\": {\"type\": \"plain\"},\n",
    "    },\n",
    "}\n",
    "\n",
    "kafka_app = FastKafka(\n",
    "    title=\"Iris predictions\",\n",
    "    kafka_brokers=kafka_brokers,\n",
    "    lifespan=lifespan,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Function decorators\n",
    "\n",
    "FastKafka provides convenient function decorators `@kafka_app.consumes` and `@kafka_app.produces` to allow you to delegate the actual process of\n",
    "\n",
    "- consuming and producing data to Kafka, and\n",
    "\n",
    "- decoding and encoding JSON encode messages\n",
    "\n",
    "from user defined functions to the framework. The FastKafka framework delegates these jobs to AIOKafka and Pydantic libraries.\n",
    "\n",
    "These decorators make it easy to specify the processing logic for your Kafka consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying Kafka integration."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This following example shows how to use the `@kafka_app.consumes` and `@kafka_app.produces` decorators in a FastKafka application:\n",
    "\n",
    "- The `@kafka_app.consumes` decorator is applied to the `on_input_data` function, which specifies that this function should be called whenever a message is received on the \"input_data\" Kafka topic. The `on_input_data` function takes a single argument which is expected to be an instance of the `IrisInputData` message class. Specifying the type of the single argument is instructing the Pydantic to use `IrisInputData.parse_raw()` on the consumed message before passing it to the user defined function `on_input_data`.\n",
    "\n",
    "- The `@produces` decorator is applied to the `to_predictions` function, which specifies that this function should produce a message to the \"predictions\" Kafka topic whenever it is called. The `to_predictions` function takes a single integer argument `species_class` representing one of three possible strign values predicted by the mdoel. It creates a new `IrisPrediction` message using this value and then returns it. The framework will call the `IrisPrediction.json().encode(\"utf-8\")` function on the returned value and produce it to the specified topic."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@kafka_app.consumes(topic=\"input_data\", auto_offset_reset=\"latest\")\n",
    "async def on_input_data(msg: IrisInputData):\n",
    "    species_class = ml_models[\"iris_predictor\"].predict(\n",
    "        [[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]\n",
    "    )[0]\n",
    "\n",
    "    await to_predictions(species_class)\n",
    "\n",
    "\n",
    "@kafka_app.produces(topic=\"predictions\")\n",
    "async def to_predictions(species_class: int) -> IrisPrediction:\n",
    "    iris_species = [\"setosa\", \"versicolor\", \"virginica\"]\n",
    "\n",
    "    prediction = IrisPrediction(species=iris_species[species_class])\n",
    "    return prediction"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Writing the test code\n",
    "\n",
    "The service can be tested using the `Tester` instance which can be configured to start a [Redpanda broker](../../api/fastkafka/testing/LocalRedpandaBroker/) for testing purposes. The `test.py` file in the cloned repository contains the following code for testing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pytest\n",
    "from application import IrisInputData, IrisPrediction, kafka_app\n",
    "\n",
    "from fastkafka.testing import Tester\n",
    "\n",
    "msg = IrisInputData(\n",
    "    sepal_length=0.1,\n",
    "    sepal_width=0.2,\n",
    "    petal_length=0.3,\n",
    "    petal_width=0.4,\n",
    ")\n",
    "\n",
    "\n",
    "@pytest.mark.asyncio\n",
    "async def test():\n",
    "    # Start Tester app and create local Redpanda broker for testing\n",
    "    async with Tester(kafka_app).using_local_redpanda(\n",
    "        tag=\"v23.1.2\", listener_port=9092\n",
    "    ) as tester:\n",
    "        # Send IrisInputData message to input_data topic\n",
    "        await tester.to_input_data(msg)\n",
    "\n",
    "        # Assert that the kafka_app responded with IrisPrediction in predictions topic\n",
    "        await tester.awaited_mocks.on_predictions.assert_awaited_with(\n",
    "            IrisPrediction(species=\"setosa\"), timeout=2\n",
    "        )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `Tester` module utilizes uses `LocalRedpandaBroker` to start and stop a Redpanda broker for testing purposes using Docker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Running the tests\n",
    "\n",
    "We can run the tests which is in `test.py` file by executing the following command:\n",
    "\n",
    "```cmd\n",
    "pytest test.py\n",
    "```\n",
    "\n",
    "This will start a Redpanda broker using Docker and executes tests. The output of the command is:\n",
    "\n",
    "```cmd\n",
    "(venv) fastkafka@airt-ai:~/dev/sample_fastkafka_with_redpanda$ pytest\n",
    "============================== test session starts ===============================\n",
    "platform linux -- Python 3.10.6, pytest-7.2.2, pluggy-1.0.0\n",
    "rootdir: /home/kumaran/dev/sample_fastkafka_with_redpanda, configfile: pytest.ini, testpaths: test.py\n",
    "plugins: asyncio-0.21.0, anyio-3.6.2\n",
    "asyncio: mode=strict\n",
    "collected 1 item                                                                 \n",
    "\n",
    "test.py .                                                                  [100%]\n",
    "\n",
    "=============================== 1 passed in 7.28s ================================\n",
    "(venv) fastkafka@airt-ai:~/dev/sample_fastkafka_with_redpanda$\n",
    "```\n",
    "\n",
    "Running the tests with the Redpanda broker ensures that your code is working correctly with a real Kafka-like message broker, making your tests more reliable. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Recap\n",
    "\n",
    "We have created an Iris classification model and encapulated it into our `FastKafka` application.\n",
    "The app will consume the `IrisInputData` from the `input_data` topic and produce the predictions to `predictions` topic.\n",
    "\n",
    "To test the app we have:\n",
    "\n",
    "1. Created the app\n",
    "\n",
    "2. Started our `Tester` class with `Redpanda` broker which mirrors the developed app topics for testing purposes\n",
    "\n",
    "3. Sent `IrisInputData` message to `input_data` topic\n",
    "\n",
    "4. Asserted and checked that the developed iris classification service has reacted to `IrisInputData` message "
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
